﻿using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Runtime.Serialization.Formatters.Binary;
using System.IO;
using System.Reflection;
using Go;

namespace GoRpc
{
    using methods_map = Dictionary<string, Func<async_result_wrap<object, go_rpc.reply_state>, object[], Task>>;
    using reply_map = Dictionary<long, tuple<async_timer, csp_wait_wrap<tuple<go_rpc.reply_state, object, string>, tuple<int, string, object[]>>>>;

    public static class go_rpc
    {
        [Serializable]
        struct request
        {
            public long id;
            public string name;
            public object[] param;
        }

        [Serializable]
        struct reply
        {
            public long id;
            public reply_state state;
            public object result;
            public string message;
        }

        public enum reply_state
        {
            undefined,
            success,
            param_so_big,
            result_so_big,
            param_nonserialized,
            remote_overtime,
            remote_stopped,
            remote_exception,
            remote_fail,
            remote_disconnect
        }

        public class exception : Exception
        {
            public reply_state code;

            public exception(reply_state st, string msg = null) : base(msg)
            {
                code = st;
            }
        }

        public class methods
        {
            struct bind_obj_info
            {
                public bool isTask;
                public MethodInfo method;
                public MethodInfo valueTask;
                public MethodInfo getResult;
            }

            protected bool _noBind = false;
            internal methods_map _methodsMap = new methods_map();

            public void bind(string name, Func<Task> handler)
            {
                Debug.Assert(!_noBind, "不正确的 bind 操作，服务已经启动");
                _methodsMap.Add(name, (async_result_wrap<object, reply_state> res, object[] args) => handler());
            }

            public void bind<T1>(string name, Func<T1, Task> handler)
            {
                Debug.Assert(!_noBind, "不正确的 bind 操作，服务已经启动");
                _methodsMap.Add(name, (async_result_wrap<object, reply_state> res, object[] args) => handler((T1)args[0]));
            }

            public void bind<T1, T2>(string name, Func<T1, T2, Task> handler)
            {
                Debug.Assert(!_noBind, "不正确的 bind 操作，服务已经启动");
                _methodsMap.Add(name, (async_result_wrap<object, reply_state> res, object[] args) => handler((T1)args[0], (T2)args[1]));
            }

            public void bind<T1, T2, T3>(string name, Func<T1, T2, T3, Task> handler)
            {
                Debug.Assert(!_noBind, "不正确的 bind 操作，服务已经启动");
                _methodsMap.Add(name, (async_result_wrap<object, reply_state> res, object[] args) => handler((T1)args[0], (T2)args[1], (T3)args[2]));
            }

            private async Task bind_wait<R>(async_result_wrap<object, reply_state> res, Task<R> task)
            {
                res.value1 = await task;
            }

            public void bind<R>(string name, Func<Task<R>> handler)
            {
                Debug.Assert(!_noBind, "不正确的 bind 操作，服务已经启动");
                _methodsMap.Add(name, delegate (async_result_wrap<object, reply_state> res, object[] args)
                {
                    Task<R> task = handler();
                    if (!task.IsCompleted)
                    {
                        return bind_wait(res, task);
                    }
                    res.value1 = task.GetAwaiter().GetResult();
                    return generator.non_async();
                });
            }

            public void bind<R, T1>(string name, Func<T1, Task<R>> handler)
            {
                Debug.Assert(!_noBind, "不正确的 bind 操作，服务已经启动");
                _methodsMap.Add(name, delegate (async_result_wrap<object, reply_state> res, object[] args)
                {
                    Task<R> task = handler((T1)args[0]);
                    if (!task.IsCompleted)
                    {
                        return bind_wait(res, task);
                    }
                    res.value1 = task.GetAwaiter().GetResult();
                    return generator.non_async();
                });
            }

            public void bind<R, T1, T2>(string name, Func<T1, T2, Task<R>> handler)
            {
                Debug.Assert(!_noBind, "不正确的 bind 操作，服务已经启动");
                _methodsMap.Add(name, delegate (async_result_wrap<object, reply_state> res, object[] args)
                {
                    Task<R> task = handler((T1)args[0], (T2)args[1]);
                    if (!task.IsCompleted)
                    {
                        return bind_wait(res, task);
                    }
                    res.value1 = task.GetAwaiter().GetResult();
                    return generator.non_async();
                });
            }

            public void bind<R, T1, T2, T3>(string name, Func<T1, T2, T3, Task<R>> handler)
            {
                Debug.Assert(!_noBind, "不正确的 bind 操作，服务已经启动");
                _methodsMap.Add(name, delegate (async_result_wrap<object, reply_state> res, object[] args)
                {
                    Task<R> task = handler((T1)args[0], (T2)args[1], (T3)args[2]);
                    if (!task.IsCompleted)
                    {
                        return bind_wait(res, task);
                    }
                    res.value1 = task.GetAwaiter().GetResult();
                    return generator.non_async();
                });
            }

            private async Task bind_wait<R>(async_result_wrap<object, reply_state> res, ValueTask<R> task)
            {
                res.value1 = await task;
            }

            public void bind<R>(string name, Func<ValueTask<R>> handler)
            {
                Debug.Assert(!_noBind, "不正确的 bind 操作，服务已经启动");
                _methodsMap.Add(name, delegate (async_result_wrap<object, reply_state> res, object[] args)
                {
                    ValueTask<R> task = handler();
                    if (!task.IsCompleted)
                    {
                        return bind_wait(res, task);
                    }
                    res.value1 = task.GetAwaiter().GetResult();
                    return generator.non_async();
                });
            }

            public void bind<R, T1>(string name, Func<T1, ValueTask<R>> handler)
            {
                Debug.Assert(!_noBind, "不正确的 bind 操作，服务已经启动");
                _methodsMap.Add(name, delegate (async_result_wrap<object, reply_state> res, object[] args)
                {
                    ValueTask<R> task = handler((T1)args[0]);
                    if (!task.IsCompleted)
                    {
                        return bind_wait(res, task);
                    }
                    res.value1 = task.GetAwaiter().GetResult();
                    return generator.non_async();
                });
            }

            public void bind<R, T1, T2>(string name, Func<T1, T2, ValueTask<R>> handler)
            {
                Debug.Assert(!_noBind, "不正确的 bind 操作，服务已经启动");
                _methodsMap.Add(name, delegate (async_result_wrap<object, reply_state> res, object[] args)
                {
                    ValueTask<R> task = handler((T1)args[0], (T2)args[1]);
                    if (!task.IsCompleted)
                    {
                        return bind_wait(res, task);
                    }
                    res.value1 = task.GetAwaiter().GetResult();
                    return generator.non_async();
                });
            }

            public void bind<R, T1, T2, T3>(string name, Func<T1, T2, T3, ValueTask<R>> handler)
            {
                Debug.Assert(!_noBind, "不正确的 bind 操作，服务已经启动");
                _methodsMap.Add(name, delegate (async_result_wrap<object, reply_state> res, object[] args)
                {
                    ValueTask<R> task = handler((T1)args[0], (T2)args[1], (T3)args[2]);
                    if (!task.IsCompleted)
                    {
                        return bind_wait(res, task);
                    }
                    res.value1 = task.GetAwaiter().GetResult();
                    return generator.non_async();
                });
            }

            public void bind(string name, Action handler)
            {
                Debug.Assert(!_noBind, "不正确的 bind 操作，服务已经启动");
                _methodsMap.Add(name, (async_result_wrap<object, reply_state> res, object[] args) => { handler(); return generator.non_async(); });
            }

            public void bind<T1>(string name, Action<T1> handler)
            {
                Debug.Assert(!_noBind, "不正确的 bind 操作，服务已经启动");
                _methodsMap.Add(name, (async_result_wrap<object, reply_state> res, object[] args) => { handler((T1)args[0]); return generator.non_async(); });
            }

            public void bind<T1, T2>(string name, Action<T1, T2> handler)
            {
                Debug.Assert(!_noBind, "不正确的 bind 操作，服务已经启动");
                _methodsMap.Add(name, (async_result_wrap<object, reply_state> res, object[] args) => { handler((T1)args[0], (T2)args[1]); return generator.non_async(); });
            }

            public void bind<T1, T2, T3>(string name, Action<T1, T2, T3> handler)
            {
                Debug.Assert(!_noBind, "不正确的 bind 操作，服务已经启动");
                _methodsMap.Add(name, (async_result_wrap<object, reply_state> res, object[] args) => { handler((T1)args[0], (T2)args[1], (T3)args[2]); return generator.non_async(); });
            }

            public void bind<R>(string name, Func<R> handler)
            {
                Debug.Assert(!_noBind, "不正确的 bind 操作，服务已经启动");
                _methodsMap.Add(name, (async_result_wrap<object, reply_state> res, object[] args) => { res.value1 = handler(); return generator.non_async(); });
            }

            public void bind<R, T1>(string name, Func<T1, R> handler)
            {
                Debug.Assert(!_noBind, "不正确的 bind 操作，服务已经启动");
                _methodsMap.Add(name, (async_result_wrap<object, reply_state> res, object[] args) => { res.value1 = handler((T1)args[0]); return generator.non_async(); });
            }

            public void bind<R, T1, T2>(string name, Func<T1, T2, R> handler)
            {
                Debug.Assert(!_noBind, "不正确的 bind 操作，服务已经启动");
                _methodsMap.Add(name, (async_result_wrap<object, reply_state> res, object[] args) => { res.value1 = handler((T1)args[0], (T2)args[1]); return generator.non_async(); });
            }

            public void bind<R, T1, T2, T3>(string name, Func<T1, T2, T3, R> handler)
            {
                Debug.Assert(!_noBind, "不正确的 bind 操作，服务已经启动");
                _methodsMap.Add(name, (async_result_wrap<object, reply_state> res, object[] args) => { res.value1 = handler((T1)args[0], (T2)args[1], (T3)args[2]); return generator.non_async(); });
            }

            public void bind_obj(string name, object obj)
            {
                MethodInfo[] methods = obj.GetType().GetMethods(BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public | BindingFlags.Static);
                Dictionary<string, LinkedList<bind_obj_info>> overrideMethod = new Dictionary<string, LinkedList<bind_obj_info>>();
                for (int i = 0; i < methods.Length; i++)
                {
                    bind_obj_info methodInfo = new bind_obj_info { method = methods[i] };
                    Type returnType = methodInfo.method.ReturnType;
                    if ((methodInfo.isTask = ("Task`1" == returnType.Name || "Task" == returnType.Name)) &&
                        "System.Threading.Tasks.Task`1[System.Threading.Tasks.VoidTaskResult]" != returnType.ToString())
                    {
                        methodInfo.getResult = returnType.GetMethod("get_Result", BindingFlags.Instance | BindingFlags.Public);
                    }
                    else if ("ValueTask`1" == returnType.Name)
                    {
                        methodInfo.valueTask = returnType.GetMethod("AsTask", BindingFlags.Instance | BindingFlags.Public);
                        methodInfo.getResult = returnType.GetMethod("get_Result", BindingFlags.Instance | BindingFlags.Public);
                    }
                    LinkedList<bind_obj_info> dupName;
                    string methodName = methodInfo.method.Name;
                    if (!overrideMethod.TryGetValue(methodName, out dupName))
                    {
                        dupName = new LinkedList<bind_obj_info>();
                        overrideMethod.Add(methodName, dupName);
                        _methodsMap.Add(string.Format("{0}.{1}", name, methodName), async delegate (async_result_wrap<object, reply_state> res, object[] args)
                        {
                            if (1 == dupName.Count)
                            {
                                bind_obj_info invokeMethod = dupName.First.Value;
                                if (invokeMethod.isTask)
                                {
                                    Task result;
                                    await (result = (Task)invokeMethod.method.Invoke(obj, args));
                                    res.value1 = null == invokeMethod.getResult ? null : invokeMethod.getResult.Invoke(result, null);
                                }
                                else if (null != invokeMethod.valueTask)
                                {
                                    object result = invokeMethod.method.Invoke(obj, args);
                                    Task task = (Task)invokeMethod.valueTask.Invoke(result, null);
                                    if (null != task) await task;
                                    res.value1 = invokeMethod.getResult.Invoke(result, null);
                                }
                                else
                                {
                                    res.value1 = await generator.send_task(() => invokeMethod.method.Invoke(obj, args));
                                }
                                return;
                            }
                            for (LinkedListNode<bind_obj_info> it = dupName.First; null != it; it = it.Next)
                            {
                                ParameterInfo[] parameters = it.Value.method.GetParameters();
                                if (parameters.Length == args.Length)
                                {
                                    int j = 0;
                                    for (; j < args.Length && parameters[j].ParameterType == args[j].GetType(); j++) { }
                                    if (j == args.Length)
                                    {
                                        bind_obj_info invokeMethod = it.Value;
                                        if (invokeMethod.isTask)
                                        {
                                            Task result;
                                            await (result = (Task)invokeMethod.method.Invoke(obj, args));
                                            res.value1 = null == invokeMethod.getResult ? null : invokeMethod.getResult.Invoke(result, null);
                                        }
                                        else if (null != invokeMethod.valueTask)
                                        {
                                            object result = invokeMethod.method.Invoke(obj, args);
                                            Task task = (Task)invokeMethod.valueTask.Invoke(result, null);
                                            if (null != task) await task;
                                            res.value1 = invokeMethod.getResult.Invoke(result, null);
                                        }
                                        else
                                        {
                                            res.value1 = await generator.send_task(() => invokeMethod.method.Invoke(obj, args));
                                        }
                                        return;
                                    }
                                }
                            }
                            res.value2 = reply_state.remote_exception;
                        });
                    }
                    dupName.AddLast(methodInfo);
                }

                FieldInfo[] fields = obj.GetType().GetFields(BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public | BindingFlags.Static);
                for (int i = 0; i < fields.Length; i++)
                {
                    try
                    {
                        FieldInfo field = fields[i];
                        _methodsMap.Add(string.Format("{0}.get@{1}", name, field.Name), delegate (async_result_wrap<object, reply_state> res, object[] args)
                        {
                            res.value1 = field.GetValue(obj);
                            return generator.non_async();
                        });
                        _methodsMap.Add(string.Format("{0}.set@{1}", name, field.Name), delegate (async_result_wrap<object, reply_state> res, object[] args)
                        {
                            field.SetValue(obj, args[0]);
                            return generator.non_async();
                        });
                    }
                    catch (System.ArgumentException) { }
                }
            }

            public void bind_block(string name, Action handler)
            {
                bind(name, () => generator.send_task(handler));
            }

            public void bind_block<T1>(string name, Action<T1> handler)
            {
                bind(name, (T1 p1) => generator.send_task(() => handler(p1)));
            }

            public void bind_block<T1, T2>(string name, Action<T1, T2> handler)
            {
                bind(name, (T1 p1, T2 p2) => generator.send_task(() => handler(p1, p2)));
            }

            public void bind_block<T1, T2, T3>(string name, Action<T1, T2, T3> handler)
            {
                bind(name, (T1 p1, T2 p2, T3 p3) => generator.send_task(() => handler(p1, p2, p3)));
            }

            public void bind_block<R>(string name, Func<R> handler)
            {
                bind(name, () => generator.send_task(handler));
            }

            public void bind_block<R, T1>(string name, Func<T1, R> handler)
            {
                bind(name, (T1 p1) => generator.send_task(() => handler(p1)));
            }

            public void bind_block<R, T1, T2>(string name, Func<T1, T2, R> handler)
            {
                bind(name, (T1 p1, T2 p2) => generator.send_task(() => handler(p1, p2)));
            }

            public void bind_block<R, T1, T2, T3>(string name, Func<T1, T2, T3, R> handler)
            {
                bind(name, (T1 p1, T2 p2, T3 p3) => generator.send_task(() => handler(p1, p2, p3)));
            }

            static public generator go_session(shared_strand strand, socket socket, methods methods, bool keepalive = false)
            {
                methods._noBind = true;
                return generator.tgo(strand, functional.bind(server.session, socket, methods, keepalive));
            }
        }

        struct rpc_block
        {
            public byte[] buff;

            public MemoryStream stream()
            {
                return new MemoryStream(buff, 4, buff.Length - 4);
            }

            public ArraySegment<byte> head()
            {
                return new ArraySegment<byte>(buff, 0, 4);
            }

            public ArraySegment<byte> body()
            {
                return new ArraySegment<byte>(buff, 4, length);
            }

            public ArraySegment<byte> entirety()
            {
                return new ArraySegment<byte>(buff, 0, 4 + length);
            }

            public int length
            {
                get
                {
                    return buff[0] | buff[1] << 8 | buff[2] << 16 | buff[3] << 24;
                }
                set
                {
                    buff[0] = (byte)(value & 0xff);
                    buff[1] = (byte)((value >> 8) & 0xff);
                    buff[2] = (byte)((value >> 16) & 0xff);
                    buff[3] = (byte)((value >> 24) & 0xff);
                }
            }

            public int max_length
            {
                get
                {
                    return buff.Length - 4;
                }
            }
        }

        public class server : methods
        {
            shared_strand _strand;
            socket_tcp.acceptor _accept;
            generator _server;

            public server(shared_strand strand = null)
            {
                _strand = null != strand ? strand : shared_strand.default_strand();
                _accept = new socket_tcp.acceptor();
            }

            public shared_strand self_strand()
            {
                return _strand;
            }

            public bool run(string ip, int port, bool keepalive = false)
            {
                if (_accept.bind(ip, port))
                {
                    _noBind = true;
                    _server = generator.make(_strand, functional.bind(server_action, keepalive));
                    _server.run();
                    return true;
                }
                return false;
            }

            public void close()
            {
                _server?.stop();
            }

            private async Task server_action(bool keepalive)
            {
                generator.children children = new generator.children();
                try
                {
                    while (true)
                    {
                        socket_tcp newSck = new socket_tcp();
                        if (!await _accept.accept(newSck))
                        {
                            break;
                        }
                        if (children.count < 100)
                        {
                            children.free_go(functional.bind(session, newSck, this, keepalive));
                        }
                        else
                        {
                            newSck.close();
                            await generator.sleep(100);
                        }
                    }
                }
                catch (System.Exception) { }
                finally
                {
                    generator.lock_stop();
                    _accept.close();
                    await children.stop();
                    generator.unlock_stop();
                }
            }

            static public async Task session(socket socket, methods methods, bool keepalive)
            {
                chan<reply> replyChan = chan<reply>.make(10);
                generator.children children = new generator.children();
                children.free_go(async delegate ()
                {
                    BinaryFormatter serializer = new BinaryFormatter();
                    rpc_block replyBuf = new rpc_block { buff = new byte[1 * 1024 * 1024] };
                    MemoryStream replyStream = replyBuf.stream();
                    async_result_wrap<socket_result> sckRes = new async_result_wrap<socket_result>();
                    async_result_wrap<chan_recv_wrap<reply>> replyRes = new async_result_wrap<chan_recv_wrap<reply>>();
                    while (true)
                    {
                        await generator.unsafe_chan_receive(replyRes, replyChan);
                        if (chan_state.closed == replyRes.value1.state)
                        {
                            break;
                        }
                        reply rep = replyRes.value1;
                        if (reply_state.undefined == rep.state)
                        {
                            replyBuf.length = 0;
                            await socket.unsafe_write(sckRes, replyBuf.head());
                            if (!sckRes.value1)
                            {
                                break;
                            }
                            continue;
                        }
                        try
                        {
                            replyStream.Position = 0;
                            await generator.send_task(() => serializer.Serialize(replyStream, rep));
                        }
                        catch (System.NotSupportedException)
                        {
                            rep.result = null;
                            rep.state = reply_state.result_so_big;
                            replyStream.Position = 0;
                            await generator.send_task(() => serializer.Serialize(replyStream, rep));
                        }
                        catch (System.Runtime.Serialization.SerializationException)
                        {
                            rep.result = null;
                            rep.state = reply_state.remote_exception;
                            replyStream.Position = 0;
                            await generator.send_task(() => serializer.Serialize(replyStream, rep));
                        }
                        replyBuf.length = (int)replyStream.Position;
                        await socket.unsafe_write(sckRes, replyBuf.entirety());
                        if (!sckRes.value1)
                        {
                            break;
                        }
                    }
                });
                rpc_block reqBuf = new rpc_block { buff = new byte[1 * 1024 * 1024] };
                MemoryStream reqStream = reqBuf.stream();
                BinaryFormatter deserializer = new BinaryFormatter();
                async_timer overtimer = keepalive ? new async_timer(generator.self_strand()) : null;
                generator sessSelf = generator.self;
                overtimer?.timeout(10000, sessSelf.stop);
                try
                {
                    async_result_wrap<socket_result> sckRes = new async_result_wrap<socket_result>();
                    async_result_wrap<chan_send_wrap> replyRes = new async_result_wrap<chan_send_wrap>();
                    while (true)
                    {
                        await socket.unsafe_read(sckRes, reqBuf.head());
                        if (!sckRes.value1)
                        {
                            break;
                        }
                        overtimer?.restart();
                        int reqLen = reqBuf.length;
                        if (reqLen <= 0)
                        {
                            await generator.unsafe_chan_send(replyRes, replyChan, new reply { state = reply_state.undefined });
                            continue;
                        }
                        if (reqLen > reqBuf.max_length)
                        {
                            break;
                        }
                        await socket.unsafe_read(sckRes, reqBuf.body());
                        if (!sckRes.value1 || children.count > 11)
                        {
                            break;
                        }
                        reqStream.Position = 0;
                        request req = await generator.send_task(() => (request)deserializer.Deserialize(reqStream));
                        children.free_go(async delegate ()
                        {
                            reply rep = new reply { id = req.id };
                            try
                            {
                                Func<async_result_wrap<object, reply_state>, object[], Task> handler;
                                if (methods._methodsMap.TryGetValue(req.name, out handler))
                                {
                                    async_result_wrap<object, reply_state> result = new async_result_wrap<object, reply_state> { value2 = reply_state.success };
                                    try
                                    {
                                        await handler(result, req.param);
                                        rep.result = result.value1;
                                        rep.state = result.value2;
                                    }
                                    catch (generator.stop_exception)
                                    {
                                        rep.state = reply_state.remote_stopped;
                                        throw;
                                    }
                                    catch (System.Exception ec)
                                    {
                                        rep.state = reply_state.remote_exception;
                                        rep.message = ec.Message;
                                    }
                                }
                                else
                                {
                                    rep.state = reply_state.remote_fail;
                                }
                            }
                            finally
                            {
                                await generator.unsafe_chan_send(replyRes, replyChan, rep);
                            }
                        });
                    }
                }
                catch (System.Exception) { }
                finally
                {
                    generator.lock_stop();
                    overtimer?.cancel();
                    socket.close();
                    await generator.chan_close(replyChan, true);
                    await children.stop();
                    generator.unlock_stop();
                }
            }
        }

        public struct invoke_result<R>
        {
            public reply_state state;
            public string message;
            public R result;

            public static implicit operator R(invoke_result<R> rval)
            {
                if (reply_state.success != rval.state)
                {
                    throw new exception(rval.state, rval.message);
                }
                return rval.result;
            }

            public override string ToString()
            {
                return reply_state.success == state ?
                    string.Format("reply_state<{0}>.msg={1}", typeof(R).Name, result) :
                    string.Format("reply_state<{0}>.state={1}", typeof(R).Name, state);
            }
        }

        public class caller
        {
            shared_strand _strand;
            protected socket _socket;
            protected generator _client;
            csp_chan<tuple<reply_state, object, string>, tuple<int, string, object[]>> _rpcChan;

            public caller(socket sck, shared_strand strand = null)
            {
                _strand = null != strand ? strand : shared_strand.default_strand();
                _socket = sck;
            }

            public shared_strand self_strand()
            {
                return _strand;
            }

            public bool run(bool keepalive = false)
            {
                if (null == _client)
                {
                    _rpcChan = new csp_chan<tuple<reply_state, object, string>, tuple<int, string, object[]>>(_strand);
                    _client = generator.make(_strand, functional.bind(client_action, keepalive));
                    _client.run();
                    return true;
                }
                return false;
            }

            public void close()
            {
                _socket.close();
                _client?.stop();
            }

            public Task wait_disconnect()
            {
                return generator.wait_other(_client);
            }

            public void sync_wait_disconnect()
            {
                generator.sync_go(_client.strand, wait_disconnect);
            }

            public async Task<R> call<R>(string name, params object[] args)
            {
                csp_invoke_wrap<tuple<reply_state, object, string>> result = await generator.csp_invoke(_rpcChan, new tuple<int, string, object[]>(-1, name, args));
                if (chan_state.ok != result.state)
                {
                    throw new exception(reply_state.remote_disconnect);
                }
                else if (reply_state.success != result.result.value1)
                {
                    throw new exception(result.result.value1, result.result.value3);
                }
                return (R)result.result.value2;
            }

            public async Task<invoke_result<R>> invoke<R>(string name, params object[] args)
            {
                csp_invoke_wrap<tuple<reply_state, object, string>> result = await generator.csp_invoke(_rpcChan, new tuple<int, string, object[]>(-1, name, args));
                if (chan_state.ok != result.state)
                {
                    return new invoke_result<R> { state = reply_state.remote_disconnect };
                }
                else if (reply_state.success != result.result.value1)
                {
                    return new invoke_result<R> { state = result.result.value1, message = result.result.value3 };
                }
                return new invoke_result<R> { state = reply_state.success, result = (R)result.result.value2 };
            }

            public async Task<R> timed_call<R>(int ms, string name, params object[] args)
            {
                csp_invoke_wrap<tuple<reply_state, object, string>> result = await generator.csp_invoke(_rpcChan, new tuple<int, string, object[]>(ms, name, args));
                if (chan_state.ok != result.state)
                {
                    throw new exception(reply_state.remote_disconnect);
                }
                else if (reply_state.success != result.result.value1)
                {
                    throw new exception(result.result.value1, result.result.value3);
                }
                return (R)result.result.value2;
            }

            public async Task<invoke_result<R>> timed_invoke<R>(int ms, string name, params object[] args)
            {
                csp_invoke_wrap<tuple<reply_state, object, string>> result = await generator.csp_invoke(_rpcChan, new tuple<int, string, object[]>(ms, name, args));
                if (chan_state.ok != result.state)
                {
                    return new invoke_result<R> { state = reply_state.remote_disconnect };
                }
                else if (reply_state.success != result.result.value1)
                {
                    return new invoke_result<R> { state = result.result.value1, message = result.result.value3 };
                }
                return new invoke_result<R> { state = reply_state.success, result = (R)result.result.value2 };
            }

            public Task<object> call(string name, params object[] args)
            {
                return call<object>(name, args);
            }

            public Task<invoke_result<object>> invoke(string name, params object[] args)
            {
                return invoke<object>(name, args);
            }

            public object sync_invoke(string name, params object[] args)
            {
                return generator.sync_go(_client.strand, () => call(name, args));
            }

            public R sync_invoke<R>(string name, params object[] args)
            {
                return (R)sync_invoke(name, args);
            }

            public Task<object> timed_call(int ms, string name, params object[] args)
            {
                return timed_call<object>(ms, name, args);
            }

            public Task<invoke_result<object>> timed_invoke(int ms, string name, params object[] args)
            {
                return timed_invoke<object>(ms, name, args);
            }

            public object sync_timed_invoke(int ms, string name, params object[] args)
            {
                return generator.sync_go(_client.strand, () => timed_call(ms, name, args));
            }

            public R sync_timed_invoke<R>(int ms, string name, params object[] args)
            {
                return (R)sync_timed_invoke(ms, name, args);
            }

            private async Task client_action(bool keepalive)
            {
                generator.children children = new generator.children();
                reply_map reqMap = new reply_map();
                generator cliSelf = generator.self;
                children.go(async delegate ()
                {
                    rpc_block replyBuf = new rpc_block { buff = new byte[1 * 1024 * 1024] };
                    MemoryStream replyStream = replyBuf.stream();
                    BinaryFormatter deserializer = new BinaryFormatter();
                    async_timer overtimer = keepalive ? new async_timer(generator.self_strand()) : null;
                    overtimer?.timeout(10000, cliSelf.stop);
                    try
                    {
                        async_result_wrap<socket_result> sckRes = new async_result_wrap<socket_result>();
                        while (true)
                        {
                            await _socket.unsafe_read(sckRes, replyBuf.head());
                            if (!sckRes.value1)
                            {
                                break;
                            }
                            overtimer?.restart();
                            int repLen = replyBuf.length;
                            if (repLen <= 0)
                            {
                                continue;
                            }
                            if (repLen > replyBuf.max_length)
                            {
                                break;
                            }
                            await _socket.unsafe_read(sckRes, replyBuf.body());
                            if (!sckRes.value1)
                            {
                                break;
                            }
                            replyStream.Position = 0;
                            reply req = await generator.send_task(() => (reply)deserializer.Deserialize(replyStream));
                            tuple<async_timer, csp_wait_wrap<tuple<reply_state, object, string>, tuple<int, string, object[]>>> cspReq;
                            if (reqMap.TryGetValue(req.id, out cspReq))
                            {
                                reqMap.Remove(req.id);
                                cspReq.value1?.cancel();
                                cspReq.value2.complete(new tuple<reply_state, object, string>(req.state, req.result, req.message));
                            }
                        }
                    }
                    catch (System.Exception) { }
                    overtimer?.cancel();
                    close();
                });
                rpc_block reqBuf = new rpc_block { buff = new byte[1 * 1024 * 1024] };
                MemoryStream reqStream = reqBuf.stream();
                BinaryFormatter serializer = new BinaryFormatter();
                long count = 0;
                try
                {
                    async_result_wrap<socket_result> sckRes = new async_result_wrap<socket_result>();
                    async_result_wrap<csp_wait_wrap<tuple<reply_state, object, string>, tuple<int, string, object[]>>> reqRes =
                        new async_result_wrap<csp_wait_wrap<tuple<reply_state, object, string>, tuple<int, string, object[]>>>();
                    while (true)
                    {
                        await generator.unsafe_csp_timed_wait(reqRes, _rpcChan, 3000);
                        csp_wait_wrap<tuple<reply_state, object, string>, tuple<int, string, object[]>> req = reqRes.value1;
                        if (chan_state.overtime == req.state)
                        {
                            reqBuf.length = 0;
                            await _socket.unsafe_write(sckRes, reqBuf.head());
                            if (!sckRes.value1)
                            {
                                break;
                            }
                            continue;
                        }
                        request rpcReq = new request { id = count++, name = req.msg.value2, param = req.msg.value3 };
                        async_timer timer = req.msg.value1 >= 0 ? new async_timer(_strand) : null;
                        reqMap.Add(rpcReq.id, tuple.make(timer, req));
                        reqStream.Position = 0;
                        try
                        {
                            timer?.timeout(req.msg.value1, delegate ()
                            {
                                reqMap.Remove(rpcReq.id);
                                req.complete(new tuple<reply_state, object, string>(reply_state.remote_overtime, null, null));
                            });
                            await generator.send_task(() => serializer.Serialize(reqStream, rpcReq));
                            reqBuf.length = (int)reqStream.Position;
                            await _socket.unsafe_write(sckRes, reqBuf.entirety());
                            if (!sckRes.value1)
                            {
                                break;
                            }
                        }
                        catch (System.NotSupportedException)
                        {
                            timer?.cancel();
                            reqMap.Remove(rpcReq.id);
                            req.complete(new tuple<reply_state, object, string>(reply_state.param_so_big, null, null));
                        }
                        catch (System.Runtime.Serialization.SerializationException)
                        {
                            timer?.cancel();
                            reqMap.Remove(rpcReq.id);
                            req.complete(new tuple<reply_state, object, string>(reply_state.param_nonserialized, null, null));
                        }
                    }
                }
                catch (System.Exception) { }
                finally
                {
                    generator.lock_stop();
                    _socket.close();
                    _rpcChan.close(true);
                    await children.stop();
                    foreach (KeyValuePair<long, tuple<async_timer, csp_wait_wrap<tuple<reply_state, object, string>, tuple<int, string, object[]>>>> pair in reqMap)
                    {
                        pair.Value.value1?.cancel();
                        pair.Value.value2.complete(new tuple<reply_state, object, string>(reply_state.remote_disconnect, null, null));
                    }
                    generator.unlock_stop();
                }
            }
        }

        public class client : caller
        {
            public client(shared_strand strand = null) : base(new socket_tcp(), strand)
            {
            }

            public async Task<bool> connect(string ip, int port, bool keepalive = false)
            {
                if (await ((socket_tcp)_socket).connect(ip, port))
                {
                    return run(keepalive);
                }
                return false;
            }

            public bool sync_connect(string ip, int port, bool keepalive = false)
            {
                return generator.sync_go(self_strand(), () => connect(ip, port, keepalive));
            }
        }

        [Serializable]
        struct named_socket_shake
        {
            public string name;
            public int timeout;
        }

        [Serializable]
        struct named_socket_shake_reply
        {
            public string name;
            public reply_state state;
        }

        public class named_socket_server
        {
            shared_strand _strand;
            socket_tcp.acceptor _accept;
            generator _server;
            Dictionary<string, csp_chan<socket_tcp, void_type>> _pipeMap;

            public named_socket_server(shared_strand strand = null)
            {
                _strand = null != strand ? strand : shared_strand.default_strand();
                _accept = new socket_tcp.acceptor();
                _pipeMap = new Dictionary<string, csp_chan<socket_tcp, void_type>>();
            }

            public shared_strand self_strand()
            {
                return _strand;
            }

            public bool run(string ip, int port)
            {
                if (_accept.bind(ip, port))
                {
                    _server = generator.make(_strand, server_action);
                    _server.run();
                    return true;
                }
                return false;
            }

            public void close()
            {
                _server?.stop();
            }

            public async Task<socket_tcp> wait_connection(string name, int ms = -1)
            {
                bool isMineChan = false;
                try
                {
                    csp_chan<socket_tcp, void_type> resChan = null;
                    generator.lock_suspend_and_stop();
                    await generator.send_strand(_strand, delegate ()
                    {
                        if (!_pipeMap.TryGetValue(name, out resChan))
                        {
                            isMineChan = true;
                            resChan = new csp_chan<socket_tcp, void_type>(_strand);
                            _pipeMap.Add(name, resChan);
                        }
                    });
                    await generator.unlock_suspend_and_stop();
                    return (await generator.csp_timed_invoke(resChan, ms, (socket_tcp socket) => socket.close())).result;
                }
                finally
                {
                    if (isMineChan)
                    {
                        generator.lock_suspend_and_stop();
                        await generator.send_strand(_strand, delegate ()
                        {
                            _pipeMap.Remove(name);
                        });
                        await generator.unlock_suspend_and_stop();
                    }
                }
            }

            private async Task server_action()
            {
                generator.children children = new generator.children();
                try
                {
                    while (true)
                    {
                        socket_tcp newSck = new socket_tcp();
                        if (!await _accept.accept(newSck))
                        {
                            break;
                        }
                        if (children.count < 100)
                        {
                            children.free_go(functional.bind(session, newSck));
                        }
                        else
                        {
                            newSck.close();
                            await generator.sleep(100);
                        }
                    }
                }
                catch (System.Exception) { }
                finally
                {
                    generator.lock_stop();
                    _accept.close();
                    await children.stop();
                    generator.unlock_stop();
                }
            }

            private async Task session(socket_tcp socket)
            {
                rpc_block reqBuf = new rpc_block { buff = new byte[4096] };
                MemoryStream reqStream = reqBuf.stream();
                BinaryFormatter serializer = new BinaryFormatter();
                try
                {
                    if (!await socket.read(reqBuf.head()))
                    {
                        socket.close();
                        return;
                    }
                    int reqLen = reqBuf.length;
                    if (reqLen <= 0 || reqLen > reqBuf.max_length || !await socket.read(reqBuf.body()))
                    {
                        socket.close();
                        return;
                    }
                    named_socket_shake req = await generator.send_task(() => (named_socket_shake)serializer.Deserialize(reqStream));
                    bool isMineChan = false;
                    csp_chan<socket_tcp, void_type> waitConn;
                    Func<named_socket_shake_reply, ArraySegment<byte>> shakeReply = delegate (named_socket_shake_reply reply)
                    {
                        reqStream.Position = 0;
                        serializer.Serialize(reqStream, reply);
                        reqBuf.length = (int)reqStream.Position;
                        return reqBuf.entirety();
                    };
                    try
                    {
                        if (!_pipeMap.TryGetValue(req.name, out waitConn) && 0 == req.timeout)
                        {
                            await socket.write(await generator.send_task(() => shakeReply(new named_socket_shake_reply { name = req.name, state = reply_state.remote_fail })));
                            socket.close();
                            return;
                        }
                        else if (null == waitConn)
                        {
                            isMineChan = true;
                            waitConn = new csp_chan<socket_tcp, void_type>(_strand);
                            _pipeMap.Add(req.name, waitConn);
                        }
                        if (chan_state.ok != await generator.csp_timed_wait(waitConn, req.timeout, functional.acry(async delegate ()
                        {
                            if (!await socket.write(await generator.send_task(() => shakeReply(new named_socket_shake_reply { name = req.name, state = reply_state.success }))))
                            {
                                socket.close();
                                generator.csp_fail();
                            }
                            return socket;
                        })))
                        {
                            await socket.write(await generator.send_task(() => shakeReply(new named_socket_shake_reply { name = req.name, state = reply_state.remote_fail })));
                            socket.close();
                        }
                    }
                    finally
                    {
                        if (isMineChan)
                        {
                            _pipeMap.Remove(req.name);
                        }
                    }
                }
                catch (System.Exception)
                {
                    socket.close();
                }
            }
        }

        public static class named_socket_client
        {
            static public async Task<socket_tcp> connect(string name, string ip, int port, int timeout = -1)
            {
                socket_tcp socket = new socket_tcp();
                try
                {
                    if (!await socket.connect(ip, port))
                    {
                        socket.close();
                        return null;
                    }
                    rpc_block reqBuf = new rpc_block { buff = new byte[4096] };
                    MemoryStream reqStream = reqBuf.stream();
                    BinaryFormatter serializer = new BinaryFormatter();
                    await generator.send_task(() => serializer.Serialize(reqStream, new named_socket_shake { name = name, timeout = timeout }));
                    reqBuf.length = (int)reqStream.Position;
                    if (!await socket.write(reqBuf.entirety()) || !await socket.read(reqBuf.head()))
                    {
                        socket.close();
                        return null;
                    }
                    int reqLen = reqBuf.length;
                    if (reqLen <= 0 || reqLen > reqBuf.max_length || !await socket.read(reqBuf.body()))
                    {
                        socket.close();
                        return null;
                    }
                    reqStream.Position = 0;
                    named_socket_shake_reply reply = await generator.send_task(() => (named_socket_shake_reply)serializer.Deserialize(reqStream));
                    if (reply_state.success == reply.state)
                    {
                        return socket;
                    }
                }
                catch (generator.stop_exception)
                {
                    socket.close();
                    throw;
                }
                catch (System.Exception)
                {
                    socket.close();
                }
                return null;
            }
        }

        enum state_cmp
        {
            lt,
            le,
            gt,
            ge,
            eq,
            ne
        }

        public class state_server
        {
            server _rpcServer;
            int[] _stateBuff;

            public state_server(int[] stateBuff, shared_strand strand = null)
            {
                _rpcServer = new server(strand);
                _stateBuff = stateBuff;
                _rpcServer.bind("read", delegate (int idx)
                {
                    return _stateBuff[idx];
                });
                _rpcServer.bind("write", delegate (int idx, int value)
                {
                    _stateBuff[idx] = value;
                });
                _rpcServer.bind("reads", delegate (int idx, int length)
                {
                    int[] res = new int[length];
                    Buffer.BlockCopy(_stateBuff, idx, res, 0, length);
                    return res;
                });
                _rpcServer.bind("writes", delegate (int idx, int[] values)
                {
                    values.CopyTo(_stateBuff, idx);
                });
                _rpcServer.bind("wait_cmp", functional.acry(async delegate (int idx, tuple<state_cmp, int> value, int ms)
                {
                    if (ms < 0)
                    {
                        while (!cmp(value.value1, value.value2, _stateBuff[idx]))
                        {
                            await generator.sleep(1);
                        }
                    }
                    else
                    {
                        long bg = system_tick.get_tick_ms();
                        while (!cmp(value.value1, value.value2, _stateBuff[idx]))
                        {
                            if (system_tick.get_tick_ms() - bg >= ms)
                            {
                                return false;
                            }
                            await generator.sleep(1);
                        }
                    }
                    return true;
                }));
            }

            public shared_strand self_strand()
            {
                return _rpcServer.self_strand();
            }

            public bool run(string ip, int port, bool keepalive = false)
            {
                return _rpcServer.run(ip, port, keepalive);
            }

            public void close()
            {
                _rpcServer.close();
            }

            private bool cmp(state_cmp type, int a, int b)
            {
                switch (type)
                {
                    case state_cmp.lt: return a < b;
                    case state_cmp.le: return a <= b;
                    case state_cmp.gt: return a > b;
                    case state_cmp.ge: return a >= b;
                    case state_cmp.eq: return a == b;
                    case state_cmp.ne: return a != b;
                }
                return false;
            }
        }

        public class state_client
        {
            client _rpcClient;

            public state_client(shared_strand strand = null)
            {
                _rpcClient = new client(strand);
            }

            public shared_strand self_strand()
            {
                return _rpcClient.self_strand();
            }

            public Task<bool> connect(string ip, int port, bool keepalive = false)
            {
                return _rpcClient.connect(ip, port, keepalive);
            }

            public bool sync_connect(string ip, int port, bool keepalive = false)
            {
                return generator.sync_go(self_strand(), () => connect(ip, port, keepalive));
            }

            public Task wait_disconnect()
            {
                return _rpcClient.wait_disconnect();
            }

            public void sync_wait_disconnect()
            {
                _rpcClient.sync_wait_disconnect();
            }

            public void close()
            {
                _rpcClient.close();
            }

            public Task<int> read(int idx)
            {
                return _rpcClient.call<int>("read", idx);
            }

            public Task write(int idx, int value)
            {
                return _rpcClient.call("write", idx, value);
            }

            public Task<int[]> reads(int idx, int length)
            {
                return _rpcClient.call<int[]>("reads", idx, length);
            }

            public Task writes(int idx, int[] values)
            {
                return _rpcClient.call("writes", idx, values);
            }

            public Task<bool> wait_lt(int idx, int value, int ms = -1)
            {
                return _rpcClient.call<bool>("wait_cmp", idx, new tuple<state_cmp, int>(state_cmp.lt, value), ms);
            }

            public Task<bool> wait_le(int idx, int value, int ms = -1)
            {
                return _rpcClient.call<bool>("wait_cmp", idx, new tuple<state_cmp, int>(state_cmp.le, value), ms);
            }

            public Task<bool> wait_gt(int idx, int value, int ms = -1)
            {
                return _rpcClient.call<bool>("wait_cmp", idx, new tuple<state_cmp, int>(state_cmp.gt, value), ms);
            }

            public Task<bool> wait_ge(int idx, int value, int ms = -1)
            {
                return _rpcClient.call<bool>("wait_cmp", idx, new tuple<state_cmp, int>(state_cmp.ge, value), ms);
            }

            public Task<bool> wait_eq(int idx, int value, int ms = -1)
            {
                return _rpcClient.call<bool>("wait_cmp", idx, new tuple<state_cmp, int>(state_cmp.eq, value), ms);
            }

            public Task<bool> wait_ne(int idx, int value, int ms = -1)
            {
                return _rpcClient.call<bool>("wait_cmp", idx, new tuple<state_cmp, int>(state_cmp.ne, value), ms);
            }

            public int sync_read(int idx)
            {
                return generator.sync_go(self_strand(), () => read(idx));
            }

            public void sync_write(int idx, int value)
            {
                generator.sync_go(self_strand(), () => write(idx, value));
            }

            public int[] sync_reads(int idx, int length)
            {
                return generator.sync_go(self_strand(), () => reads(idx, length));
            }

            public void sync_writes(int idx, int[] values)
            {
                generator.sync_go(self_strand(), () => writes(idx, values));
            }

            public bool sync_wait_lt(int idx, int value, int ms = -1)
            {
                return generator.sync_go(self_strand(), () => wait_lt(idx, value, ms));
            }

            public bool sync_wait_le(int idx, int value, int ms = -1)
            {
                return generator.sync_go(self_strand(), () => wait_le(idx, value, ms));
            }

            public bool sync_wait_gt(int idx, int value, int ms = -1)
            {
                return generator.sync_go(self_strand(), () => wait_gt(idx, value, ms));
            }

            public bool sync_wait_ge(int idx, int value, int ms = -1)
            {
                return generator.sync_go(self_strand(), () => wait_ge(idx, value, ms));
            }

            public bool sync_wait_eq(int idx, int value, int ms = -1)
            {
                return generator.sync_go(self_strand(), () => wait_eq(idx, value, ms));
            }

            public bool sync_wait_ne(int idx, int value, int ms = -1)
            {
                return generator.sync_go(self_strand(), () => wait_ne(idx, value, ms));
            }
        }

        public class kv_server
        {
            server _rpcServer;
            Dictionary<string, object> _kvMap;

            public kv_server(shared_strand strand = null)
            {
                _rpcServer = new server(strand);
                _kvMap = new Dictionary<string, object>();
                _rpcServer.bind("set", delegate (string key, object value)
                {
                    try
                    {
                        _kvMap.Add(key, value);
                        return true;
                    }
                    catch (System.Exception)
                    {
                        return false;
                    }
                });
                _rpcServer.bind("get", delegate (string key)
                {
                    object res;
                    return new tuple<bool, object>(_kvMap.TryGetValue(key, out res), res);
                });
                _rpcServer.bind("del", delegate (string key)
                {
                    return _kvMap.Remove(key);
                });
                _rpcServer.bind("has", delegate (string key)
                {
                    object res;
                    return _kvMap.TryGetValue(key, out res);
                });
                _rpcServer.bind("clr", delegate ()
                {
                    _kvMap.Clear();
                });
                _rpcServer.bind("cnt", delegate ()
                {
                    return _kvMap.Count;
                });
                _rpcServer.bind("upd", delegate (string key, object value, bool mustExist)
                {
                    if (mustExist)
                    {
                        object res;
                        if (!_kvMap.TryGetValue(key, out res))
                        {
                            return false;
                        }
                    }
                    else
                    {
                        _kvMap.Remove(key);
                    }
                    _kvMap.Add(key, value);
                    return true;
                });
            }

            public shared_strand self_strand()
            {
                return _rpcServer.self_strand();
            }

            public bool run(string ip, int port, bool keepalive = false)
            {
                return _rpcServer.run(ip, port, keepalive);
            }

            public void close()
            {
                _rpcServer.close();
            }
        }

        public class kv_client
        {
            client _rpcClient;

            public kv_client(shared_strand strand = null)
            {
                _rpcClient = new client(strand);
            }

            public shared_strand self_strand()
            {
                return _rpcClient.self_strand();
            }

            public Task<bool> connect(string ip, int port, bool keepalive = false)
            {
                return _rpcClient.connect(ip, port, keepalive);
            }

            public bool sync_connect(string ip, int port, bool keepalive = false)
            {
                return generator.sync_go(self_strand(), () => connect(ip, port, keepalive));
            }

            public Task wait_disconnect()
            {
                return _rpcClient.wait_disconnect();
            }

            public void sync_wait_disconnect()
            {
                _rpcClient.sync_wait_disconnect();
            }

            public void close()
            {
                _rpcClient.close();
            }

            public async Task<tuple<bool, R>> get<R>(string key)
            {
                tuple<bool, object> res = await _rpcClient.call<tuple<bool, object>>("get", key);
                return new tuple<bool, R>(res.value1, res.value1 ? (R)res.value2 : default(R));
            }

            public Task<tuple<bool, object>> get(string key)
            {
                return _rpcClient.call<tuple<bool, object>>("get", key);
            }

            public Task<bool> set(string key, object value)
            {
                return _rpcClient.call<bool>("set", key, value);
            }

            public Task<bool> delete(string key)
            {
                return _rpcClient.call<bool>("del", key);
            }

            public Task<bool> has(string key)
            {
                return _rpcClient.call<bool>("has", key);
            }

            public Task clear()
            {
                return _rpcClient.call("clr");
            }

            public Task<int> count()
            {
                return _rpcClient.call<int>("cnt");
            }

            public Task<bool> update(string key, object value, bool mustExist = false)
            {
                return _rpcClient.call<bool>("upd", key, value, mustExist);
            }

            public tuple<bool, R> sync_get<R>(string key)
            {
                return generator.sync_go(self_strand(), () => get<R>(key));
            }

            public tuple<bool, object> sync_get(string key)
            {
                return generator.sync_go(self_strand(), () => get(key));
            }

            public bool sync_set(string key, object value)
            {
                return generator.sync_go(self_strand(), () => set(key, value));
            }

            public bool sync_delete(string key)
            {
                return generator.sync_go(self_strand(), () => delete(key));
            }

            public bool sync_has(string key)
            {
                return generator.sync_go(self_strand(), () => has(key));
            }

            public void sync_clear()
            {
                generator.sync_go(self_strand(), clear);
            }

            public int sync_count()
            {
                return generator.sync_go(self_strand(), count);
            }

            public bool sync_update(string key, object value, bool mustExist = false)
            {
                return generator.sync_go(self_strand(), () => update(key, value, mustExist));
            }
        }

        public class text_socket
        {
            public enum separat
            {
                CR = 0,
                LF = 1,
                CRLF = 2
            }

            int _currBegin;
            int _currEnd;
            int _currCur;
            socket _socket;
            byte[] _readBuff;
            byte[] _writeBuff;
            static readonly byte[][] separats = { new byte[] { (byte)'\r' }, new byte[] { (byte)'\n' }, new byte[] { (byte)'\r', (byte)'\n' } };

            public text_socket(socket sck)
            {
                _currBegin = 0;
                _currEnd = 0;
                _currCur = 0;
                _socket = sck;
                _readBuff = new byte[4096];
                _writeBuff = new byte[4096];
            }

            public async Task<string> read_line()
            {
                while (true)
                {
                    for (; _currCur < _currEnd; _currCur++)
                    {
                        byte c = _readBuff[_currCur];
                        if ('\r' == c || '\n' == c)
                        {
                            if (_currBegin != _currCur)
                            {
                                int lastBegin = _currBegin;
                                _currBegin = _currCur += 1;
                                return Encoding.UTF8.GetString(_readBuff, lastBegin, _currCur - lastBegin - 1);
                            }
                            _currBegin = _currCur + 1;
                        }
                    }
                    if (_readBuff.Length == _currEnd)
                    {
                        if (0 == _currBegin)
                        {
                            _currEnd = _currCur = 0;
                            return Encoding.UTF8.GetString(_readBuff, 0, _readBuff.Length);
                        }
                        Buffer.BlockCopy(_readBuff, _currBegin, _readBuff, 0, _currEnd - _currBegin);
                        _currCur = _currEnd -= _currBegin;
                        _currBegin = 0;
                    }
                    socket_result sckRes;
                    if (!(sckRes = await _socket.read_same(new ArraySegment<byte>(_readBuff, _currEnd, _readBuff.Length - _currEnd))))
                    {
                        return null;
                    }
                    _currEnd += sckRes.s;
                }
            }

            public async Task<bool> write_line(string txt, separat sep = separat.CRLF)
            {
                int txtLen = txt.Length;
                do
                {
                    int charCnt = txtLen < 1024 ? txtLen : 1024;
                    int len = Encoding.UTF8.GetBytes(txt, txt.Length - txtLen, charCnt, _writeBuff, 0);
                    txtLen -= charCnt;
                    if (0 == txtLen)
                    {
                        separats[(int)sep].CopyTo(_writeBuff, len);
                        len += separats[(int)sep].Length;
                    }
                    if (!await _socket.write(new ArraySegment<byte>(_writeBuff, 0, len)))
                    {
                        return false;
                    }
                } while (txtLen > 0);
                return true;
            }

            public string sync_read_line()
            {
                return generator.sync_go(new shared_strand(), read_line);
            }

            public bool sync_write_line(string txt)
            {
                return generator.sync_go(new shared_strand(), () => write_line(txt));
            }
        }
    }
}
